iT邦幫忙

2023 iThome 鐵人賽

DAY 9
0

Hadoop 是用 Java 編程的,因此用 Java 來開發相關應用是最方便的,不過 Hadoop 也支援用其他語言開發,如 C++PythonRuby。Python 的性能雖然比不上 Java 或 C++,但優點是有豐富的數據處理庫,如 PandasNumPy,因此應用程序如果涉及大量的數據處理和轉換,Python 會是個不錯的選擇。

  • 本日目標:Word Count
    想學 Hadoop MapReduce 一定會從 Word Count 開始,可以說是跟 Hello World 一樣的存在,今天我會示範如何使用 Python 來開發 Word Count 的 MapReduce 程式。

程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。

Hadoop Streaming

前幾天有提到 MapReudce 的流程是 Input > Split > Map > Shuffle and Sort > Reduce > Output,而實際上我們需要開發的只有 Map 以及 Reduce 兩個部分,其餘部分都交由 Hadoop 來幫我們處理就好~

Hadoop Streaming 就是一個協助我們創建和運行 MapReduce 作業的工具,我們可以撰寫自己的腳本來充當 MapperReducer,用法如下:

mapred streaming \
    -input myInputDirs \       # 
    -output myOutputDir \
    -mapper /path/to/mapper \
    -reducer /path/to/reducer \
    -file /path/to/file

網路上很多是舊版做法 (hadoop jar $HADOOP_HOME/hadoop-streaming.jar),現在已經不能用了喔!

  • options 解釋:
    • inpnt:輸入 mapper 的目錄路徑
    • output:reducer 輸出的目錄路徑
    • mapper:mapper 路徑
    • reducer:reducer 路徑
    • file:將檔案分發到集群中的各個節點,並被保存在各個節點的工作路徑中,供各個節點使用

MapReduce with Python

由於我們是使用 Python 來開發程式,我們無法直接使用 Hadoop API (JAVA) 來傳輸數據,所以要改為使用標準輸入 (STDIN)標準輸出(STDOUT) 來作為數據傳遞通道,優點是通用性,因為所有程式語言都支持標準 I/O 通道,缺點是效率不一定是最好的。

Mapper

Mapper 負責讀取輸入數據,將每個拆分的數據進行處理,並輸出鍵值對 (key-value pair)。

#!/usr/bin/env python
import sys

for line in sys.stdin:

    line = line.strip().lower()
    words = line.split()
    
    # give every word 1 count    
    for word in words:
        print(f'{word}\t1')

這裡程式邏輯很簡單,其實單純是做數據格式轉換而已,斷詞的部分簡單用 split 來處理,如果想要斷的乾淨點或做其他轉換可以自己修改程式碼。

Reducer

Reducer 負責讀取 Mapper 的輸出結果,將相同key的數據進行合併、計算或其他處理,並輸出鍵值對 (key-value pair) 。

#!/usr/bin/env python
import sys

curr_word = None
curr_count = 0

for line in sys.stdin:

    line = line.strip()
    word, count = line.split('\t')
    count = int(count)
    
    # the if condition works only when the input is sorted
    if curr_word != word:
        if curr_word : 
            print(f"{curr_word}\t{curr_count}")
        curr_word = word
        curr_count = 0
    
    curr_count += count

# output last word
if curr_word:
    print(f"{curr_word}\t{curr_count}")

這裡程式邏輯也很簡單,遇到相同的詞就把 count 加起來,遇到不同的就重設並印出前一個詞的合計。

本地測試

  • 在使用 Hadoop Streaming 之前,我們先在本地端試試,在終端機執行下面的指令 (記得修改檔案路徑),可以注意這裡有一個 sort 的動作。
    # echo "some testing text" | python <path/to/mapper.py> | sort | python <path/to/reducer.py>
    echo "some testing text" | python 30days/day09/mapper.py | sort | python 30days/day09/reducer.py
    
  • 結果如下:
    https://ithelp.ithome.com.tw/upload/images/20230924/201389391R4g209jpq.png

Hadoop 測試

  1. 啟動 HDFS (& YARN)
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
  1. 上傳測試檔案

在 day09 資料夾中我準備了一些測試用檔案,執行 uploading.py 即可上傳測試檔案 (要放在同個資料夾內),程式碼放下面大家自己看。

python 30days/day09/uploading.py
  • uploading.py
    # make connections
    from hdfs import InsecureClient
    client = InsecureClient("http://localhost:9870/", user='mengchiehliu')
    
    # make directory
    client.makedirs('day09/input')
    
    # upload testing files
    import os
    local_dir = os.path.dirname(__file__)
    client.upload(hdfs_path="day09/input/test_text_1.txt", local_path=f'{local_dir}/test_text_1.txt', overwrite=True)
    client.upload(hdfs_path="day09/input/test_text_2.txt", local_path=f'{local_dir}/test_text_2.txt', overwrite=True)
    client.upload(hdfs_path="day09/input/test_text_3.txt", local_path=f'{local_dir}/test_text_3.txt', overwrite=True)
    
  1. Hadoop Streaming
mapred streaming \
    -input day09/input \
    -output day09/output \
    -mapper 30days/day09/mapper.py \
    -reducer 30days/day09/reducer.py

因為是在偽分布模式 (單一節點) 執行,所以不用帶 file option

事實上,因為 reducer 做的內容就是加總而已,因此 Hadoop 早就幫我們封裝成一個 aggregate 功能,所以我們也直接寫:

mapred streaming \
    -input day09/input \
    -output day09/output \
    -mapper 30days/day09/mapper.py \
    -reducer aggregate 
  1. 查看結果
  • 終端機輸入:
    hadoop fs -cat day09/output/*
    
  • 結果如下:
    https://ithelp.ithome.com.tw/upload/images/20230924/20138939DUJd3wVvt3.png

今天介紹了 MapReduce 的基本應用 WordCount,簡單複習一下,我們使用了 Hadoop Streaming,讀取存儲在 HDFS 內的文字檔,經過 MapperReducer 運算後產出每個詞的出現頻率。

預告

明天會介紹如何用 Hive 來在 Hadoop 中處理結構化數據。

參考資料

Hadoop Streaming
Writing An Hadoop MapReduce Program In Python


上一篇
Day08 - HDFS 基本操作 (FileSystem Shell & Python)
下一篇
Day10 - Hive 介紹
系列文
30天認識主流大數據框架:Hadoop + Spark + Flink30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言